package me.seawenc.db.migration.helper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

/**
 * <p>Title: LocalCmdHelper </p>
 * <p>Description：执行本地指令帮助类 </p>
 * Package: com.dgp.common.utils
 * Software: AIC
 * Date: 2020/12/13 16:22
 *
 * @author chengsheng
 * @version 1.0.0
 */
public class LocalCmdHelper {

    /**
     * INFO级别
     */
    public static String LOG_TYPE_INFO = "INFO";

    /**
     * ERROR级别
     */
    public static String LOG_TYPE_ERROR = "ERROR";

    /**
     * 本地执行进程句柄
     */
    private static Process p;

    /**
     * 获取错误信息控制台线程
     */
    private static Thread errorReceiveThread;

    /**
     * 执行指令
     *
     * @param command
     * @param consumer
     * @throws IOException
     */
    public static void executeCommand(String command, Consumer<ConsoleLogBean> consumer) {
        consumer.accept(new ConsoleLogBean(LOG_TYPE_INFO, "开始执行指令:" + command));
        Runtime runtime = Runtime.getRuntime();
        Map<String, String> envs = new HashMap<>();
        envs.putAll(System.getenv());
        envs.put("JAVA_HOME","/data/apps/jdk-11.0.15.1");
        String[] envp = envs.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()).toArray(String[]::new);
        try {
            p = runtime.exec(command,envp);
        } catch (IOException e) {
            consumer.accept(new ConsoleLogBean(LOG_TYPE_ERROR, "执行指令出错:" + e.getMessage()));
            return;
        }
        // 获取正常的日志输出-线程
        errorReceiveThread = new Thread(new ErrorStreamRunnable(p.getErrorStream(), consumer));
        // 添加结束后回调
        runtime.addShutdownHook(new Thread(() -> shutdownHook(consumer)));
        errorReceiveThread.start();

        // 获取正常的日志输出
        readConsoleOutput(consumer, p.getInputStream());
        // 等待指令结果，关闭线程
        shutdownThread(command, consumer);
    }

    /**
     * 关闭指令开启的线程
     *
     * @param command
     * @param consumer
     */
    private static void shutdownThread(String command, Consumer<ConsoleLogBean> consumer) {
        Integer execStatus = -1;
        try {
            execStatus = p.waitFor();
            // 需要等待方法：readConsoleOutput执行完成
            Thread.sleep(10);
            p.destroy();
        } catch (Exception e) {
            Log.warn("本地指令执行失败!msg=" + command + "," + e.getMessage());
        } finally {
            String errorTip = "执行指令失败，" + command + ",返回状态码：" + execStatus;
            Optionalx.ifThen(execStatus != 0, () -> Log.warn(errorTip));
            shutdownHook(consumer);
        }
    }

    /**
     * 读取控制台输出
     *
     * @param consumer
     * @param is
     * @throws IOException
     */
    private static void readConsoleOutput(Consumer<ConsoleLogBean> consumer, InputStream is) {
        readConsoleOutput(consumer, is, LOG_TYPE_INFO);
    }

    /**
     * 获得控制台输出的日志
     * @param consumer
     * @param is
     * @param logType
     */
    private static void readConsoleOutput(Consumer<ConsoleLogBean> consumer, InputStream is, String logType) {
        try (InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr)) {
            String inline = "";
            while ((inline = br.readLine()) != null) {
                if (inline.contains("SLF4J:")) {
                    continue;
                }
                if(inline.contains("com.alibaba.datax.common.exception.DataXException")){
                    logType=LOG_TYPE_ERROR;
                }
                if(inline.contains("- ERROR -")){
                    logType=LOG_TYPE_ERROR;
                }
                if(inline.contains("Call getNextException")){
                    logType=LOG_TYPE_ERROR;
                }
                consumer.accept(new ConsoleLogBean(logType, inline));
            }
        } catch (IOException e) {
            consumer.accept(new ConsoleLogBean(LOG_TYPE_ERROR, "readConsoleOutput,报错：" + e.getMessage()));
        }
    }

    /**
     * 错误输出接收
     */
    static class ErrorStreamRunnable implements Runnable {
        /**
         * 错误流
         */
        private InputStream is;
        /**
         * 错误 流消费者
         */
        private Consumer<ConsoleLogBean> consumer;

        /**
         * 错误流封装类
         * @param is
         * @param consumer
         */
        public ErrorStreamRunnable(InputStream is, Consumer<ConsoleLogBean> consumer) {
            this.is = is;
            this.consumer = consumer;
        }

        /**
         * 启动错误流接收
         */
        @Override
        public void run() {
            readConsoleOutput(consumer, is, LOG_TYPE_ERROR);
        }
    }

    /**
     * 进程结束回调
     *
     * @param consumer
     */
    private static void shutdownHook(Consumer<ConsoleLogBean> consumer) {
        Optionalx.ifThen(errorReceiveThread.isAlive(), () -> errorReceiveThread.interrupt());
        Optionalx.ifThen(p.isAlive(), () -> p.destroy());
//        consumer.accept(new ConsoleLogBean(LOG_TYPE_INFO, "cmd任务执行结束!"));
    }

    /**
     * 控制台日志bean
     */
    public static class ConsoleLogBean {
        /**
         * 日志类型，从错误流中获得到时为error，正常流中获取到是info
         */
        public String type = LOG_TYPE_INFO;
        /**
         * 日志内容
         */
        public String ctn = "";

        /**
         * 构造方法（无参）
         */
        ConsoleLogBean() {
        }

        /**
         * 构造方法
         * @param ctn
         */
        ConsoleLogBean(String ctn) {
            this.ctn = ctn;
        }

        /**
         * 构造方法
         * @param type 类型
         * @param ctn
         */
        ConsoleLogBean(String type, String ctn) {
            this.type = type;
            this.ctn = ctn;
        }

        @Override
        public String toString() {
            return String.format("[%s]:%s", type, ctn);
        }

        /**
         * 日志记录
         */
        public void toLog() {
            if (LOG_TYPE_INFO.equals(type)) {
                Log.info(ctn);
            }
            if (LOG_TYPE_ERROR.equals(type)) {
                Log.error(ctn);
            }
        }

        /**
         * 是否是json内容
         *
         * @return
         */
        private boolean isJsonCtn() {
            return Optionalx.isPresent(ctn) && ctn.startsWith("{") && ctn.endsWith("}");
        }

        /**
         * 判断是否是提示消息
         * @return
         */
        private boolean isTipMsg() {
            return Optionalx.isPresent(ctn) && ctn.startsWith("tip-");
        }

        /**
         * //如果不为json格式，则直接打印（tip信息只打印一次）
         *
         * @param isFirstTip
         * @return
         */
        public boolean canPrintCtn(boolean isFirstTip) {
            if (isTipMsg()) {
                return isFirstTip;
            }
            return !isJsonCtn();
        }

        /**
         * 是否是json内容，如果是，是否包含关键字
         *
         * @param keyword
         * @return
         */
        public boolean isJsonCtnHasKeyword(String keyword) {
            return isJsonCtn() && ctn.contains(keyword);
        }

        public boolean isErrorLog(){
            return LOG_TYPE_ERROR.equals(type);
        }

        public boolean isInfoLog(){
            return LOG_TYPE_INFO.equals(type);
        }
    }
}
